package com.gemini.main.rocketmq;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

public class OrderedProducer {

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {


        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("qifeizn-rocketmq-test");

        producer.setNamesrvAddr("localhost:9876");
        //Launch the instance.
        producer.start();

        String[] tags = {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 1; i++) {
            int orderId = i % 10;
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTestOrdered", tags[0], "KEY" + i,
                    ("你是谁啊 " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {

                /**
                 *
                 * @param mqs
                 * @param msg
                 * @param arg arg 参数就是传入的orderId，通过相同的orderId选择一个相同的队列来实现FIFO
                 * @return
                 */
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    System.out.println(arg);
                    Integer id = (Integer) arg;
                    int index = id % mqs.size();
                    return mqs.get(index);
                }
            }, orderId);

            System.out.printf("%s%n", sendResult);
        }
    }
}

class OrderedConsumer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");

        //consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        // Specify name server addresses.
        consumer.setNamesrvAddr("127.0.0.1:9876");

        consumer.subscribe("TopicTestOrdered", "TagA || TagC || TagD");

        // 集群模式
        consumer.setMessageModel(MessageModel.CLUSTERING);

        /**
         * 设置消费端线程数固定为 4-8
         */
        consumer.setConsumeThreadMax(8);
        consumer.setConsumeThreadMin(4);

        consumer.setMaxReconsumeTimes(2);

        consumer.registerMessageListener(new MessageListenerOrderly() {

            AtomicLong consumeTimes = new AtomicLong(0);
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(false);
                System.out.printf(Thread.currentThread().getName() + "OrderedConsumer1 Receive New Messages: " + new String(msgs.get(0).getBody())+ "%n");
                this.consumeTimes.incrementAndGet();
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();

        System.out.printf("Consumer Started.%n");

    }
}

class OrderedConsumer2 {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name2");

        //consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        // 集群模式
        consumer.setMessageModel(MessageModel.CLUSTERING);

        // Specify name server addresses.
        consumer.setNamesrvAddr("127.0.0.1:9876");

        consumer.subscribe("TopicTestOrdered", "TagA || TagC || TagD");

        /**
         * 设置消费端线程数固定为 4-8
         */
        consumer.setConsumeThreadMax(8);
        consumer.setConsumeThreadMin(4);

        consumer.setMaxReconsumeTimes(2);

        consumer.registerMessageListener(new MessageListenerOrderly() {

            AtomicLong consumeTimes = new AtomicLong(0);
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                //context.setAutoCommit(false);
                // 一定要commit 不然重启会重复消费，官网的例子是在真的坑人啊
                System.out.printf(Thread.currentThread().getName() + " OrderedConsumer2 Receive New Messages: " + msgs + "%n");
                this.consumeTimes.incrementAndGet();
                /*if ((this.consumeTimes.get() % 2) == 0) {
                    return ConsumeOrderlyStatus.SUCCESS;
                } else if ((this.consumeTimes.get() % 3) == 0) {
                    return ConsumeOrderlyStatus.ROLLBACK;
                } else if ((this.consumeTimes.get() % 4) == 0) {
                    return ConsumeOrderlyStatus.COMMIT;
                } else if ((this.consumeTimes.get() % 5) == 0) {
                    context.setSuspendCurrentQueueTimeMillis(3000);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }*/
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();

        System.out.printf("Consumer Started.%n");

    }
}
